package io.sqrtqiezi.spark.lagou

import org.apache.spark.sql.SparkSession

object LogAnalysis {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName(this.getClass.getCanonicalName.init)
      .getOrCreate()

    val logRDD = spark.sparkContext.textFile("lagou-data/cdn.txt")
      .map(line => line.split(" "))

    // 100.79.121.48 HIT 33 [15/Feb/2017:00:00:46 +0800] "GET http://cdn.v.abc.com.cn/videojs/video.js HTTP/1.1" 200 174055 "http://www.abc.com.cn/" "Mozilla/4.0+(compatible;+MSIE+6.0;+Windows+NT+5.1;+Trident/4.0;)"
    // 1. 统计独立 IP 数
    val count = logRDD.map(_(0)).distinct.count
    println(count) // 43649

    // 2. 每个视频文件的独立 IP 数
    val mp4Count = logRDD
      .map(item => ("\\d+\\.mp4".r.findFirstIn(item(6)).getOrElse(""), item(0))) // 从访问地址中提取 mp4 文件名
      .filter { case (key, _) => key != "" } // 过滤掉非 mp4
      .distinct       // 去重
      .countByKey     // 按文件名统计 ip

    for((name, count) <- mp4Count.toArray.sortWith((lt, rt) => lt._2 > rt._2))
      println(s"$name : $count")

    // 3. 一天中每个小时的流量
    val pvByHour = logRDD.map(_ (3))
      //.map(item => item.substring(1, 12)) // 日期
      .map(_.split(":")(1)) // 小时
      .countByValue

    for((name, count) <- pvByHour.toArray.sortBy(_._1))
      println(s"$name : $count")

    spark.stop()
  }
}
